Skip to content

Conversation

saqadri
Copy link
Collaborator

@saqadri saqadri commented Sep 19, 2025

Summary

  • Replace activity-bound elicitation/sampling with Temporal signals so
    workflows can truly pause and resume later.
  • Eliminate global, in-memory state for signal responses; use per-workflow
    deterministic state (SignalMailbox) and per-request signal names.
  • Keep synchronous request path for non-workflow contexts unchanged and
    backward compatible.

Problems Identified

  • Global state: A module-level dict (e.g., _workflow_signal_states) in
    workflow context is not persisted/replayed, and breaks determinism.
  • Awaiting in signal handlers: Workflow signal handlers must be fast/
    deterministic; awaiting arbitrary async calls is risky.
  • Non-determinism: workflow.wait_condition reading module-level state is non-
    replayable.
  • Activity timeouts: Modeling elicitation/sampling as activities leads to
    timeouts/retries; we want async server->client flow that pauses the workflow.

What Changed

  • Workflow path: server->client requests go async and return via signals.
    • In-workflow calls to SessionProxy.request(...) now:
    • Start an activity that triggers an async gateway request.
    • Receive a unique signal name.
    • Wait on that signal (via workflow-local mailbox), then return the
      payload.
  • App server: New async endpoint triggers the client call in the background,
    then signals the workflow with the result.
  • Backwards compatibility: request_via_proxy keeps its original positional
    signature and defaults to synchronous behavior, unless make_async_call=True
    is passed.

Code Pointers (What to Improve / Review)

  • src/mcp_agent/executor/temporal/session_proxy.py
    • SessionProxy.request(...):
    • Workflow path now executes activity mcp_relay_request
      with make_async_call=True, gets signal_name, then waits via
      context.executor.wait_for_signal(signal_name, workflow_id, run_id).
    • Non-workflow path still calls the sync gateway endpoint and awaits
      result.
  • Review to ensure no reads from global state or locks in workflow context.
  • src/mcp_agent/executor/temporal/system_activities.py
    • mcp_relay_request(...): now accepts (make_async_call, execution_id,
      method, params).
    • If async: generate a unique signal name (e.g., mcp_rpc_{method}_{uuid} ), call the async server endpoint, return signal_name.
    • If sync: call synchronous request endpoint and return JSON.
  • src/mcp_agent/mcp/client_proxy.py
    • request_via_proxy(...):
    • New internal /_request_via_proxy_impl with full kwargs.
    • Public request_via_proxy(*args, **kwargs) wrapper preserves positional
      calling (request_via_proxy("run", "m", {})) to keep tests working.
    • Async mode: use /internal/session/by-run/{workflow_id}/{execution_id}/ async-request (determines workflow_id from Temporal activity context),
      returns immediately (None), signal carries the result later.
    • Sync mode: unchanged (/internal/session/by-run/{execution_id}/ request), returns JSON result.
  • src/mcp_agent/server/app_server.py
    • _install_internal_routes(...):
    • New async endpoint POST /internal/session/by-run/{workflow_id}/ {execution_id}/async-request:
      • Validates auth, enqueues background task, calls client via session
        (prefers latest > mapped), then signal the workflow with signal_name and
        the result payload.
    • Shared helpers: _check_gateway_auth, _handle_request_via_rpc,
      _handle_specific_request, _try_session_request (used by both sync/async
      routes).
    • Synchronous /request and /notify endpoints preserved and refactored
      for clarity.
  • src/mcp_agent/executor/workflow.py
    • Confirm we rely on the existing @workflow.signal(dynamic=True) handler
      and SignalMailbox for deterministic signal handling (no awaits, no globals).

Recommendations / TODOs

  • Do not add global response caches (e.g., _workflow_signal_states) for
    workflows. Use the workflow’s SignalMailbox.
  • Avoid await inside workflow signal handlers. The dynamic handler pushes
    values into the mailbox; wait_condition observes mailbox version changes
    (deterministic).
  • Use per-request signal names for correlation. You can standardize a prefix
    like mcp_rpc__.
  • Keep synchronous paths untouched for non-workflow contexts; only opt-in to
    async behavior when actually inside a Temporal workflow.
  • Apply consistent auth across internal endpoints via a single helper (see
    _check_gateway_auth usage).
  • Document the async endpoint contract:
    • Request body: {"method": "...", "params": {...}, "signal_name": "..."}.
    • Returns immediately with {"status": "received", ...}; result arrives
      via signal.

Compatibility Notes

  • request_via_proxy("run", "m", {}) still works. To use async behavior, call
    request_via_proxy(..., make_async_call=True, signal_name=...) from activity
    context.
  • SessionProxy.request constructs the correct call path automatically when it
    detects workflow runtime; no changes needed at call sites in workflows.

Why This Is Temporal-Compliant

  • State lives on the workflow instance, and signals are persisted and replayed
    by Temporal.
  • wait_condition predicates read only deterministic, in-workflow state
    (mailbox version), not module-level globals.
  • Activities perform all I/O; workflows pause on signals and resume
    deterministically.

Potential Follow-ups

  • Standardize error payloads { "error": str } for both sync and async paths.
  • Add unit tests around the async endpoint (with a mocked Temporal client
    handle) and per-request signal correlation.
  • Consider config flags to explicitly enable/disable async RPCs if you want to
    roll out gradually.

If you want, I can put this into a branch and push a minimal PR body with
these bullets and references to the updated functions so your teammate has
clear pointers.

Summary by CodeRabbit

  • New Features

    • Added asynchronous request mode for session/gateway interactions, enabling non-blocking operations with immediate acknowledgements and workflow-delivered results.
    • Introduced an internal async-request endpoint to handle long-running tasks without holding connections open.
    • Maintains backward compatibility; synchronous behavior remains unchanged.
  • Refactor

    • Centralized internal gateway authentication for consistency across endpoints.
    • Unified internal session request routing with RPC or typed fallbacks for improved reliability.

Copy link

coderabbitai bot commented Sep 19, 2025

Walkthrough

Adds an asynchronous request flow across Temporal workflow, activities, gateway proxy, and app server: activities can dispatch async gateway requests with a generated signal name; workflows await a matching signal for the response. Introduces a new internal async-request route, centralized auth checks, helper routing, and updated signatures for relay and proxy functions. Synchronous flow remains.

Changes

Cohort / File(s) Summary of Changes
Temporal Session Proxy (Workflow path)
src/mcp_agent/executor/temporal/session_proxy.py
In workflow path, pass make_async_call to executor.execute and capture returned signal_name; fetch workflow/run ids; wait for signal payload named signal_name; return payload. Non-workflow path calls relay with is_async_call=False.
Temporal System Activities
src/mcp_agent/executor/temporal/system_activities.py
relay_request now supports async via make_async_call flag. When async, generate UUID signal_name, call request_via_proxy with async controls, and return signal_name; otherwise, perform synchronous request and return result. Signature and return type updated.
MCP Client Proxy
src/mcp_agent/mcp/client_proxy.py
Split into internal _request_via_proxy_impl and a backward-compatible wrapper request_via_proxy. Added make_async_call and signal_name controls. Async path posts to /internal/session/by-run/{workflow_id}/{execution_id}/async-request using Temporal activity context; returns None on enqueue. Sync path unchanged.
App Server (Internal routes/auth/async)
src/mcp_agent/server/app_server.py
Added _check_gateway_auth. Refactored internal request handling via helpers (_handle_request_via_rpc, _handle_specific_request, _try_session_request). Updated /internal/session/by-run/{execution_id}/request to use helpers and auth. Added /internal/session/by-run/{workflow_id}/{execution_id}/async-request to dispatch RPC asynchronously and signal workflow with success/error.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant WF as Temporal Workflow (SessionProxy)
  participant ACT as SystemActivities.relay_request
  participant GW as App Server (internal)
  participant CL as Connected Client

  rect rgb(240,248,255)
    note over WF: Async request path
    WF->>ACT: execute(make_async_call=True, method, params)
    ACT->>GW: async-request(workflow_id, execution_id, method, params, signal_name)
    GW-->>ACT: 202 Accepted (ack)
    ACT-->>WF: signal_name
    par Client handling
      GW->>CL: RPC async method/params
      CL-->>GW: Result or error
      GW-->>WF: Signal signal_name with payload
    end
    WF-->>WF: wait_for_signal(signal_name)
    WF-->>WF: Resume with payload
  end
Loading
sequenceDiagram
  autonumber
  participant CALLER as Non-workflow Caller
  participant ACT as SystemActivities.relay_request
  participant GW as App Server (internal)
  participant CL as Connected Client

  rect rgb(245,245,245)
    note over CALLER: Synchronous request path (unchanged)
    CALLER->>ACT: relay_request(make_async_call=False, method, params)
    ACT->>GW: request(execution_id, method, params)
    GW->>CL: RPC method/params
    CL-->>GW: Result
    GW-->>ACT: Result JSON
    ACT-->>CALLER: Result JSON
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • roman-van-der-krogt
  • rholinshead

Poem

I thump my paws: async at last!
Signals hop back, responses fast.
Through tunnels of workflow, whispers ring—
A UUID is spring’s new string.
Gateway nods, the client sings,
And in the burrow, green lights blink. 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 41.18% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title clearly and concisely captures the primary change: switching elicitation and sampling to use a per-workflow signal mailbox so workflows can pause and resume deterministically; this matches the PR objectives and file-level changes. It is specific to the main feature and avoids noisy file lists or vague wording, so a reviewer scanning history will understand the primary intent.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/elicitation_signal

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/mcp_agent/executor/temporal/system_activities.py (1)

92-133: Don’t ignore enqueue failures in async path.

If request_via_proxy returns {"error": ...}, the workflow will hang forever waiting for a signal that will never arrive. Fail fast.

         if make_async_call:
             # Create a unique signal name for this request
             signal_name = f"mcp_rpc_{method}_{uuid.uuid4().hex}"
-            await request_via_proxy(
+            ack = await request_via_proxy(
                 make_async_call=True,
                 execution_id=execution_id,
                 method=method,
                 params=params or {},
                 signal_name=signal_name,
                 gateway_url=gateway_url,
                 gateway_token=gateway_token,
             )
+            if isinstance(ack, dict) and ack.get("error"):
+                raise RuntimeError(f"async-request enqueue failed: {ack.get('error')}")
             return signal_name
🧹 Nitpick comments (6)
src/mcp_agent/server/app_server.py (3)

392-395: Standardize unauthorized payload shape.

Other internal endpoints use {"ok": False, "error": "..."}; keep it consistent to avoid client regressions.

Use the diff above which already aligns the payload.


604-636: Session binding logic LGTM.

Rebinding on success is a nice touch; small nit: consider logging when register_session=False for parity.


662-666: Unify auth checks across all internal endpoints.

/internal/workflows/log and /internal/human/prompts still implement bespoke auth. Please switch them to _check_gateway_auth for consistency.

Apply:

@@ async def _internal_workflows_log(request: Request):
-            # Optional shared-secret auth
-            gw_token = os.environ.get("MCP_GATEWAY_TOKEN")
-            if gw_token:
-                bearer = request.headers.get("Authorization", "")
-                ...
-                    return JSONResponse(
-                        {"ok": False, "error": "unauthorized"}, status_code=401
-                    )
+            auth_err = _check_gateway_auth(request)
+            if auth_err:
+                return auth_err
@@ async def _internal_human_prompts(request: Request):
-            # Optional shared-secret auth
-            gw_token = os.environ.get("MCP_GATEWAY_TOKEN")
-            if gw_token:
-                bearer = request.headers.get("Authorization", "")
-                ...
-                    return JSONResponse({"error": "unauthorized"}, status_code=401)
+            auth_err = _check_gateway_auth(request)
+            if auth_err:
+                return auth_err
src/mcp_agent/mcp/client_proxy.py (2)

195-220: Honor “no timeout” when MCP_GATEWAY_REQUEST_TIMEOUT <= 0.

Currently only parse errors become no-timeout; 0 or negative values should too.

-        timeout_str = os.environ.get("MCP_GATEWAY_REQUEST_TIMEOUT")
-        if timeout_str is None:
-            timeout = httpx.Timeout(None)
-        else:
-            try:
-                timeout = float(str(timeout_str).strip())
-            except Exception:
-                timeout = httpx.Timeout(None)
+        timeout_str = os.environ.get("MCP_GATEWAY_REQUEST_TIMEOUT")
+        timeout_val = None
+        if timeout_str is not None:
+            try:
+                timeout_val = float(str(timeout_str).strip())
+            except Exception:
+                timeout_val = None
+        timeout = httpx.Timeout(None) if (timeout_val is None or timeout_val <= 0) else timeout_val

Also consider aligning with MCP_GATEWAY_TIMEOUT naming used elsewhere, or documenting both.


221-252: Same <=0 timeout handling for sync path.

Mirror the logic to avoid accidental immediate timeouts.

-    if timeout_str is None:
-        timeout_float = None  # no timeout by default; activity timeouts still apply
-    else:
-        try:
-            timeout_float = float(str(timeout_str).strip())
-        except Exception:
-            timeout_float = None
+    timeout_float = None
+    if timeout_str is not None:
+        try:
+            timeout_float = float(str(timeout_str).strip())
+        except Exception:
+            timeout_float = None
@@
-        if timeout_float is None:
+        if timeout_float is None or timeout_float <= 0:
             timeout = httpx.Timeout(None)
         else:
             timeout = timeout_float
src/mcp_agent/executor/temporal/session_proxy.py (1)

118-123: Remove duplicate import of workflow alias.

_module import exists at Line 12; drop the inner import.

-            from temporalio import workflow as _twf  # type: ignore
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 38a731f and 6cfc614.

📒 Files selected for processing (4)
  • src/mcp_agent/executor/temporal/session_proxy.py (1 hunks)
  • src/mcp_agent/executor/temporal/system_activities.py (2 hunks)
  • src/mcp_agent/mcp/client_proxy.py (2 hunks)
  • src/mcp_agent/server/app_server.py (5 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
src/mcp_agent/executor/temporal/system_activities.py (1)
src/mcp_agent/mcp/client_proxy.py (1)
  • request_via_proxy (255-277)
src/mcp_agent/server/app_server.py (3)
src/mcp_agent/executor/temporal/session_proxy.py (3)
  • request (108-148)
  • request (347-350)
  • send_request (170-191)
src/mcp_agent/app.py (3)
  • logger (204-221)
  • context (160-165)
  • executor (176-177)
src/mcp_agent/executor/temporal/workflow_signal.py (1)
  • signal (146-195)
src/mcp_agent/executor/temporal/session_proxy.py (6)
src/mcp_agent/app.py (2)
  • workflow (433-466)
  • executor (176-177)
src/mcp_agent/executor/task_registry.py (1)
  • get_activity (25-28)
src/mcp_agent/executor/temporal/__init__.py (1)
  • execute (212-228)
src/mcp_agent/executor/workflow.py (1)
  • executor (118-123)
src/mcp_agent/executor/temporal/workflow_signal.py (1)
  • wait_for_signal (71-127)
src/mcp_agent/executor/temporal/system_activities.py (1)
  • relay_request (93-132)
🔇 Additional comments (8)
src/mcp_agent/server/app_server.py (2)

535-550: Helper looks good.

Good RPC-first approach with clean logging; clear return semantics.


551-603: Typed fallback mapping LGTM.

Controlled allow‑list for typed methods is clear and safe.

src/mcp_agent/mcp/client_proxy.py (3)

151-168: Wrapper and docstring LGTM.

Clear async vs sync behavior and return type contract.


175-185: Activity-context workflow_id inference is correct.

Good guardrail for misuse outside activity.


254-278: Backward-compat wrapper LGTM.

Signature preservation for positional args is helpful.

src/mcp_agent/executor/temporal/session_proxy.py (2)

144-148: Sync fallback call LGTM.

Parameter order matches SystemActivities.relay_request(make_async_call=False, ...).


133-141: Confirm executor.wait_for_signal signature accepts run_id and signal_description.

Found async wait_for_signal implementations in src/mcp_agent/executor/executor.py (around lines ~186 and ~403) and the Signal object exposes .description (src/mcp_agent/executor/workflow_signal.py:225). Verify executor.wait_for_signal accepts a signal_description kwarg (or else construct/pass a Signal with description) — call site: src/mcp_agent/executor/temporal/session_proxy.py:135.

src/mcp_agent/executor/temporal/system_activities.py (1)

4-4: uuid import LGTM.

Comment on lines +125 to 131
signal_name = await self._executor.execute(
act,
True, # make_async_call
exec_id,
method,
params or {},
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Propagate activity errors returned by executor.execute.

TemporalExecutor.execute may return a BaseException; handle it to avoid waiting on a bogus signal_name.

-            signal_name = await self._executor.execute(
+            result = await self._executor.execute(
                 act,
                 True,  # make_async_call
                 exec_id,
                 method,
                 params or {},
             )
+            if isinstance(result, BaseException):
+                raise result
+            signal_name = result  # expected to be str
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
signal_name = await self._executor.execute(
act,
True, # make_async_call
exec_id,
method,
params or {},
)
result = await self._executor.execute(
act,
True, # make_async_call
exec_id,
method,
params or {},
)
if isinstance(result, BaseException):
raise result
signal_name = result # expected to be str
🤖 Prompt for AI Agents
In src/mcp_agent/executor/temporal/session_proxy.py around lines 125 to 131, the
value returned by self._executor.execute may be a BaseException and the code
currently treats it as a valid signal_name; detect when the result is an
exception and re-raise it (or otherwise propagate it) instead of proceeding to
await/use it as a signal name. Update the code to check isinstance(result,
BaseException) right after the await and raise the exception if so, ensuring
calling context observes the error rather than waiting on a bogus signal_name.

Comment on lines +361 to +379
# Helper function for shared authentication across internal endpoints
def _check_gateway_auth(request: Request) -> JSONResponse | None:
gw_token = os.environ.get("MCP_GATEWAY_TOKEN")
if not gw_token:
return None
bearer = request.headers.get("Authorization", "")
bearer_token = (
bearer.split(" ", 1)[1]
if bearer.lower().startswith("bearer ")
else ""
)
header_tok = request.headers.get("X-MCP-Gateway-Token", "")
if not (
secrets.compare_digest(header_tok, gw_token)
or secrets.compare_digest(bearer_token, gw_token)
):
return JSONResponse({"error": "unauthorized"}, status_code=401)
return None

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Make gateway auth mandatory (current logic leaves routes unauthenticated if MCP_GATEWAY_TOKEN is unset).

As written, internal endpoints are publicly callable whenever MCP_GATEWAY_TOKEN isn’t set. That’s a security risk for signal injection and request relaying.

Apply:

-        def _check_gateway_auth(request: Request) -> JSONResponse | None:
+        def _check_gateway_auth(request: Request) -> JSONResponse | None:
             gw_token = os.environ.get("MCP_GATEWAY_TOKEN")
-            if not gw_token:
-                return None
+            if not gw_token:
+                # Require explicit opt-out (dev only) to run without auth
+                if os.environ.get("MCP_GATEWAY_AUTH_OPTIONAL", "").lower() not in ("1", "true", "yes"):
+                    return JSONResponse({"ok": False, "error": "unauthorized"}, status_code=401)
+                return None
@@
-            if not (
+            if not (
                 secrets.compare_digest(header_tok, gw_token)
                 or secrets.compare_digest(bearer_token, gw_token)
             ):
-                return JSONResponse({"error": "unauthorized"}, status_code=401)
+                return JSONResponse({"ok": False, "error": "unauthorized"}, status_code=401)
             return None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Helper function for shared authentication across internal endpoints
def _check_gateway_auth(request: Request) -> JSONResponse | None:
gw_token = os.environ.get("MCP_GATEWAY_TOKEN")
if not gw_token:
return None
bearer = request.headers.get("Authorization", "")
bearer_token = (
bearer.split(" ", 1)[1]
if bearer.lower().startswith("bearer ")
else ""
)
header_tok = request.headers.get("X-MCP-Gateway-Token", "")
if not (
secrets.compare_digest(header_tok, gw_token)
or secrets.compare_digest(bearer_token, gw_token)
):
return JSONResponse({"error": "unauthorized"}, status_code=401)
return None
# Helper function for shared authentication across internal endpoints
def _check_gateway_auth(request: Request) -> JSONResponse | None:
gw_token = os.environ.get("MCP_GATEWAY_TOKEN")
if not gw_token:
# Require explicit opt-out (dev only) to run without auth
if os.environ.get("MCP_GATEWAY_AUTH_OPTIONAL", "").lower() not in ("1", "true", "yes"):
return JSONResponse({"ok": False, "error": "unauthorized"}, status_code=401)
return None
bearer = request.headers.get("Authorization", "")
bearer_token = (
bearer.split(" ", 1)[1]
if bearer.lower().startswith("bearer ")
else ""
)
header_tok = request.headers.get("X-MCP-Gateway-Token", "")
if not (
secrets.compare_digest(header_tok, gw_token)
or secrets.compare_digest(bearer_token, gw_token)
):
return JSONResponse({"ok": False, "error": "unauthorized"}, status_code=401)
return None
🤖 Prompt for AI Agents
In src/mcp_agent/server/app_server.py around lines 361 to 379, the helper
_check_gateway_auth currently treats missing MCP_GATEWAY_TOKEN as "no auth
required" which allows unauthenticated access; change the logic so that if
os.environ.get("MCP_GATEWAY_TOKEN") is falsy the function immediately returns a
401 JSONResponse (e.g., {"error": "unauthorized"}) instead of None, and
otherwise continue to validate the Authorization and X-MCP-Gateway-Token headers
using secrets.compare_digest; this ensures routes are rejected when the gateway
token is not configured.

Comment on lines +855 to +947
@mcp_server.custom_route(
"/internal/session/by-run/{workflow_id}/{execution_id}/async-request",
methods=["POST"],
include_in_schema=False,
)
async def _relay_async_request(request: Request):
"""Start an async RPC to the connected client and signal the workflow with the result.
Body: { method: str, params: dict, signal_name: str }
Path: workflow_id, execution_id (run_id)
"""
body = await request.json()
execution_id = request.path_params.get("execution_id")
workflow_id = request.path_params.get("workflow_id")
method = body.get("method")
params = body.get("params") or {}
signal_name = body.get("signal_name")

# Auth
auth_err = _check_gateway_auth(request)
if auth_err:
return auth_err

if not signal_name:
return JSONResponse({"error": "missing_signal_name"}, status_code=400)

async def _do_async():
result: Dict[str, Any] | None = None
error: str | None = None
try:
# Try latest session first
latest_session = _get_fallback_upstream_session()
if latest_session is not None:
try:
result = await _try_session_request(
latest_session,
method,
params,
execution_id,
log_prefix="async-request",
register_session=True,
)
except Exception as e_latest:
try:
logger.warning(
f"[async-request] latest session failed for execution_id={execution_id} method={method}: {e_latest}"
)
except Exception:
pass

# Fallback to mapped session
if result is None:
session = await _get_session(execution_id)
if not session:
error = "session_not_available"
else:
try:
result = await _try_session_request(
session,
method,
params,
execution_id,
log_prefix="async-request",
register_session=False,
)
except Exception as e_sess:
error = str(e_sess)
except Exception as e:
error = str(e)

# Signal the workflow with the result or error
try:
app = _get_attached_app(mcp_server)
if app and app.context and getattr(app.context, "executor", None):
executor = app.context.executor
client = getattr(executor, "client", None)
if client and workflow_id and execution_id:
handle = client.get_workflow_handle(
workflow_id=workflow_id, run_id=execution_id
)
payload = result if error is None else {"error": error}
await handle.signal(signal_name, payload)
except Exception as se:
try:
logger.error(f"[async-request] failed to signal workflow: {se}")
except Exception:
pass

asyncio.create_task(_do_async())
return JSONResponse(
{"status": "received", "execution_id": execution_id, "signal_name": signal_name}
)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure Temporal client before signaling; validate signal names.

  • executor.client may be None; call ensure_client() before get_workflow_handle to avoid runtime failures.
  • Basic signal_name validation avoids accidental whitespace/newline issues.
         async def _relay_async_request(request: Request):
@@
-            async def _do_async():
+            async def _do_async():
                 result: Dict[str, Any] | None = None
                 error: str | None = None
@@
-                # Signal the workflow with the result or error
+                # Signal the workflow with the result or error
                 try:
                     app = _get_attached_app(mcp_server)
                     if app and app.context and getattr(app.context, "executor", None):
                         executor = app.context.executor
-                        client = getattr(executor, "client", None)
+                        await executor.ensure_client()
+                        client = getattr(executor, "client", None)
                         if client and workflow_id and execution_id:
                             handle = client.get_workflow_handle(
                                 workflow_id=workflow_id, run_id=execution_id
                             )
                             payload = result if error is None else {"error": error}
                             await handle.signal(signal_name, payload)
@@
-            asyncio.create_task(_do_async())
+            # Validate signal name quickly
+            if not isinstance(signal_name, str) or not signal_name.strip() or any(c in signal_name for c in ("\r", "\n")):
+                return JSONResponse({"error": "invalid_signal_name"}, status_code=400)
+            asyncio.create_task(_do_async())
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@mcp_server.custom_route(
"/internal/session/by-run/{workflow_id}/{execution_id}/async-request",
methods=["POST"],
include_in_schema=False,
)
async def _relay_async_request(request: Request):
"""Start an async RPC to the connected client and signal the workflow with the result.
Body: { method: str, params: dict, signal_name: str }
Path: workflow_id, execution_id (run_id)
"""
body = await request.json()
execution_id = request.path_params.get("execution_id")
workflow_id = request.path_params.get("workflow_id")
method = body.get("method")
params = body.get("params") or {}
signal_name = body.get("signal_name")
# Auth
auth_err = _check_gateway_auth(request)
if auth_err:
return auth_err
if not signal_name:
return JSONResponse({"error": "missing_signal_name"}, status_code=400)
async def _do_async():
result: Dict[str, Any] | None = None
error: str | None = None
try:
# Try latest session first
latest_session = _get_fallback_upstream_session()
if latest_session is not None:
try:
result = await _try_session_request(
latest_session,
method,
params,
execution_id,
log_prefix="async-request",
register_session=True,
)
except Exception as e_latest:
try:
logger.warning(
f"[async-request] latest session failed for execution_id={execution_id} method={method}: {e_latest}"
)
except Exception:
pass
# Fallback to mapped session
if result is None:
session = await _get_session(execution_id)
if not session:
error = "session_not_available"
else:
try:
result = await _try_session_request(
session,
method,
params,
execution_id,
log_prefix="async-request",
register_session=False,
)
except Exception as e_sess:
error = str(e_sess)
except Exception as e:
error = str(e)
# Signal the workflow with the result or error
try:
app = _get_attached_app(mcp_server)
if app and app.context and getattr(app.context, "executor", None):
executor = app.context.executor
client = getattr(executor, "client", None)
if client and workflow_id and execution_id:
handle = client.get_workflow_handle(
workflow_id=workflow_id, run_id=execution_id
)
payload = result if error is None else {"error": error}
await handle.signal(signal_name, payload)
except Exception as se:
try:
logger.error(f"[async-request] failed to signal workflow: {se}")
except Exception:
pass
asyncio.create_task(_do_async())
return JSONResponse(
{"status": "received", "execution_id": execution_id, "signal_name": signal_name}
)
@mcp_server.custom_route(
"/internal/session/by-run/{workflow_id}/{execution_id}/async-request",
methods=["POST"],
include_in_schema=False,
)
async def _relay_async_request(request: Request):
"""Start an async RPC to the connected client and signal the workflow with the result.
Body: { method: str, params: dict, signal_name: str }
Path: workflow_id, execution_id (run_id)
"""
body = await request.json()
execution_id = request.path_params.get("execution_id")
workflow_id = request.path_params.get("workflow_id")
method = body.get("method")
params = body.get("params") or {}
signal_name = body.get("signal_name")
# Auth
auth_err = _check_gateway_auth(request)
if auth_err:
return auth_err
if not signal_name:
return JSONResponse({"error": "missing_signal_name"}, status_code=400)
async def _do_async():
result: Dict[str, Any] | None = None
error: str | None = None
try:
# Try latest session first
latest_session = _get_fallback_upstream_session()
if latest_session is not None:
try:
result = await _try_session_request(
latest_session,
method,
params,
execution_id,
log_prefix="async-request",
register_session=True,
)
except Exception as e_latest:
try:
logger.warning(
f"[async-request] latest session failed for execution_id={execution_id} method={method}: {e_latest}"
)
except Exception:
pass
# Fallback to mapped session
if result is None:
session = await _get_session(execution_id)
if not session:
error = "session_not_available"
else:
try:
result = await _try_session_request(
session,
method,
params,
execution_id,
log_prefix="async-request",
register_session=False,
)
except Exception as e_sess:
error = str(e_sess)
except Exception as e:
error = str(e)
# Signal the workflow with the result or error
try:
app = _get_attached_app(mcp_server)
if app and app.context and getattr(app.context, "executor", None):
executor = app.context.executor
await executor.ensure_client()
client = getattr(executor, "client", None)
if client and workflow_id and execution_id:
handle = client.get_workflow_handle(
workflow_id=workflow_id, run_id=execution_id
)
payload = result if error is None else {"error": error}
await handle.signal(signal_name, payload)
except Exception as se:
try:
logger.error(f"[async-request] failed to signal workflow: {se}")
except Exception:
pass
# Validate signal name quickly
if not isinstance(signal_name, str) or not signal_name.strip() or any(c in signal_name for c in ("\r", "\n")):
return JSONResponse({"error": "invalid_signal_name"}, status_code=400)
asyncio.create_task(_do_async())
return JSONResponse(
{"status": "received", "execution_id": execution_id, "signal_name": signal_name}
)
🤖 Prompt for AI Agents
In src/mcp_agent/server/app_server.py around lines 855 to 947, the code may call
executor.client.get_workflow_handle when executor.client is None and passes
unvalidated signal_name; update the signaling block to (1) validate signal_name
by trimming and ensuring it's non-empty and contains no newlines/only allowed
characters before proceeding, and (2) call executor.ensure_client() (or
equivalent) and re-check client existence before calling get_workflow_handle; if
ensure_client() raises or client is still None, log and skip signaling
gracefully to avoid runtime failures.

@saqadri saqadri closed this Sep 19, 2025
@rholinshead
Copy link
Member

@saqadri was this intentional to close? It got my hopes up that this might fix the resume signal issue noticed in the mcp_agent_server temporal example

Copy link
Collaborator Author

saqadri commented Sep 19, 2025

Haha, @rholinshead this is just some work I was collaborating with @roman-van-der-krogt on — his PRs, #507 and #502 have applied these changes.

I still haven’t looked at the resume issue

@rholinshead
Copy link
Member

Haha, @rholinshead this is just some work I was collaborating with @roman-van-der-krogt on — his PRs, #507 and #502 have applied these changes.

I still haven’t looked at the resume issue

Ah, ok -- was thinking that the proxying might fix it as well since it seems the mailbox version isn't getting bumped (at least for the mailbox instance that the run is waiting on). I can retest once it gets merged & deployed just to see

Copy link
Collaborator Author

saqadri commented Sep 19, 2025

@roman-van-der-krogt did you try out the PauseResumeWorkflow in the temporal example during your testing? Probably can keep those separate issues, but I think Ryan found that the resume signal wasn't working.

@roman-van-der-krogt
Copy link
Contributor

@roman-van-der-krogt did you try out the PauseResumeWorkflow in the temporal example during your testing? Probably can keep those separate issues, but I think Ryan found that the resume signal wasn't working.

I did not, but I can have a look at it after cleaning up #507

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants